In [2]:
%matplotlib inline
from __future__ import division
import scipy
from scipy.stats import binom, uniform
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.cm as cm
from math import sqrt, pi, exp, log
plt.rcParams['figure.figsize'] = (15.0, 15.0)
plt.rcParams['font.size'] = 10
plt.rcParams['font.family'] = 'sans-serif'
plt.rcParams['font.sans-serif'] = ['Tahoma']
plt.rcParams['axes.labelsize'] = 1.5*plt.rcParams['font.size']
plt.rcParams['axes.titlesize'] = 1.5*plt.rcParams['font.size']
plt.rcParams['legend.fontsize'] = plt.rcParams['font.size']
plt.rcParams['xtick.labelsize'] = 1.5*plt.rcParams['font.size']
plt.rcParams['ytick.labelsize'] = 1.5*plt.rcParams['font.size']
T = 1000
scale=1
colors = cm.rainbow(np.linspace(0, 1, 6))
scipy.set_printoptions(precision = 4, suppress = True)
V = ['0','1','2','3','4','5']
Q = ['-1','0','1']
M = 5
In [3]:
def b(x, q):
p = binom.pmf(x, M, 0.5+0.25*q, loc=0)
return p
def Yt(q):
q=float(q)
return str(binom.rvs(M, 0.5+0.25*q))
def YtDiscrete(q):
values = ['-1', '0', '1']
probabilities = [0.005, 0.005, 0.005]
for i, x in enumerate(values):
if x==q:
probabilities[i] = 0.990
return np.random.choice(values, p=probabilities)
def alpha_pass2(O,A,B,PI):
alpha = [{} for t in range(0,T)]
c = [0 for t in range(0,T)]
alpha[0] = {i: PI[i]*B[i][O[0]] for i in Q}
c[0] = sum(alpha[0].values())
c[0]=1/c[0]
alpha[0].update((x, y*c[0]) for x, y in alpha[0].items())
for t in range(1,T):
for i in Q:
alpha[t][i] = sum([A[j][i]*alpha[t-1][i] for j in Q])
alpha[t][i] = alpha[t][i]*B[i][yt[t]]
c[t] = c[t]+alpha[t][i]
c[t]=1/c[t]
alpha[t].update((x, y*c[t]) for x, y in alpha[t].items())
return {'alpha': alpha, 'c': c}
def beta_pass2(O,A,B,c):
beta = [{} for t in range(0,T)]
beta[T-1] = {i: c[T-1] for i in Q}
for t in range(T-2,-1,-1):
for i in Q:
beta[t][i] = sum([A[i][j] *B[j][O[t+1]] *beta[t+1][j] for j in Q])
beta[t][i] = c[t]*beta[t][i]
return beta
def gamma_pass2(O, A, B, c, al, be):
gammaij = [{} for t in range(0,T)]
gamma = [{} for t in range(0,T)]
for t in range(0,T-1):
denom = 0
for i in Q:
for j in Q:
denom = denom + al[t][i]*A[i][j]*B[j][O[t+1]]*be[t+1][j]
for i in Q:
gammaij[t][i] = {}
for j in Q:
gammaij[t][i][j] = al[t][i]*A[i][j]*B[j][O[t+1]]*be[t+1][j]/denom
gamma[t][i] = sum(gammaij[t][i].values())
denom = sum(alpha[T-1].values())
for i in Q:
gamma[T-1][i] = al[T-1][i]/denom
return {'gamma': gamma, 'gammaij': gammaij}
def viterbi(yt,PI,A,B):
delta = [{} for t in range(0,T)]
path = {}
for q in Q:
delta[0][q] = log(PI[q])+log(B[q][yt[0]])
path[q] = [q]
#print path
for t in range(1,T):
tempath = {}
for q in Q:
(Z, state) = max(( delta[t-1][x] +log(B[q][yt[t]]) + log(A[x][q]),x) for x in Q)
delta[t][q] = Z
#print(q,state)
tempath[q] = path[state]+[q]
path = tempath
#print path
#print(delta)
(p, state) = max((delta[T-1][q], q) for q in Q)
#print(p)
return path[state],delta
def run_hmm(A, PI, discrete=False):
states = []
probs = []
if discrete:
Y = YtDiscrete
else:
Y = Yt
for key,value in PI.iteritems():
states.append(key)
probs.append(value)
starting_state = np.random.choice(states,p=probs)
prev_state = starting_state
xt = [prev_state]
yt = [Y(prev_state)]
#print(xt,yt)
for t in range(1,T):
state_tpm = A[xt[t-1]]
row_states = []
row_probs = []
for key,value in state_tpm.iteritems():
row_states.append(key)
row_probs.append(value)
next_state = np.random.choice(row_states, p=row_probs)
xt.append(next_state)
yt.append(Y(next_state))
return {'xt': xt, 'yt': yt}
In [4]:
V = ['0','1','2','3','4','5']
Q = ['-1','0','1']
B0 = {}
for q in Q:
B0[q] = {}
for v in V:
vi = int(v)
B0[q][v] = b(vi, int(q))
A0 = {'-1': {'-1': 0.990 , '0': 0.005, '1': 0.005} ,
'0': {'-1': 0.005 , '0': 0.990, '1': 0.005} ,
'1': {'-1': 0.005 , '0': 0.005, '1': 0.990}}
PI0 = {'-1':1/3,'0':1/3,'1':1/3}
In [5]:
V = Q
B0 = A0
np.random.seed(100)
for i in range(0,6):
d = run_hmm(A0, PI0, discrete=True)
xt = d['xt']
yt = d['yt']
plt.scatter(xt, yt, color=colors[i])
plt.title('Y v/s X for discrete distribution[Debug check plot]')
plt.xlabel('x')
plt.ylabel('y')
Out[5]:
In [6]:
fig = plt.figure()
np.random.seed(100)
for i in range(0,6):
d = run_hmm(A0, PI0, discrete=True)
ax = fig.add_subplot(3,2,i+1)
xt = d['xt']
yt = d['yt']
xtp, delta = viterbi(yt,PI0,A0,B0)
plt.scatter(xtp, xt, color=colors[i])
plt.subplots_adjust(hspace = .35)
plt.title('Run {} Viterbi(X Predicted) vs Original(X) [Debug check]'.format(i+1))
plt.xlabel("X Predicted")
plt.ylabel('X Original')
In [7]:
V = ['0','1','2','3','4','5']
Q = ['-1','0','1']
B0 = {}
for q in Q:
B0[q] = {}
for v in V:
vi = int(v)
B0[q][v] = b(vi, int(q))
A0 = {'-1': {'-1': 0.990 , '0': 0.005, '1': 0.005} ,
'0': {'-1': 0.005 , '0': 0.990, '1': 0.005} ,
'1': {'-1': 0.005 , '0': 0.005, '1': 0.990}}
PI0 = {'-1':1/3,'0':1/3,'1':1/3}
In [8]:
np.random.seed(100)
for i in range(0,6):
d = run_hmm(A0, PI0, discrete=False)
xt = d['xt']
yt = d['yt']
plt.scatter(xt, yt, color=colors[i])
plt.title('Y v/s X')
plt.xlabel('x')
plt.ylabel('y')
Out[8]:
In [9]:
fig = plt.figure()
np.random.seed(100)
for i in range(0,6):
d = run_hmm(A0, PI0, discrete=False)
ax = fig.add_subplot(3,2,i+1)
xt = d['xt']
yt = d['yt']
xtp, delta = viterbi(yt,PI0,A0,B0)
plt.scatter(xtp, xt, color=colors[i])
plt.subplots_adjust(hspace = .35)
plt.title('Run {} Viterbi(X Predicted) vs Original(X)'.format(i+1))
plt.xlabel("X Predicted")
plt.ylabel('X Original')
In [15]:
np.random.seed(100)
for i in range(0,6):
d = run_hmm(A0, PI0)
xt = d['xt']
yt = d['yt']
alp = alpha_pass2(yt,A0,B0,PI0)
alpha = alp['alpha']
c = alp['c']
sumlogc = sum([log(x) for x in c])
prob = (-sumlogc)
print(' Run {} log P(O|lambda) = {}'.format(i+1, prob))
In [11]:
import operator
fig = plt.figure()
np.random.seed(100)
for i in range(0,6):
d = run_hmm(A0, PI0)
xt = d['xt']
yt = d['yt']
alp = alpha_pass2(yt,A0,B0,PI0)
alpha = alp['alpha']
c = alp['c']
sumlogc = sum([log(x) for x in c])
prob = (-sumlogc)
beta = beta_pass2(yt,A0,B0,c)
gamm = gamma_pass2(yt, A0, B0, c, alpha, beta)
gamma = gamm['gamma']
xtp = []
for t in range(0,T):
stats = gamma[t]
ml = max(stats.iteritems(), key=operator.itemgetter(1))[0]
xtp.append(ml)
ax = fig.add_subplot(3,2,i+1)
plt.scatter(xtp, xt, color=colors[i])
plt.title('Run {} Viterbi(X Predicted) vs Original(X)'.format(i+1))
plt.xlabel("X Predicted")
plt.ylabel('X Original')
In [12]:
def restimate(gamma, gammaij, c ):
pi1 = {i: gamma[0][i] for i in Q}
Aest = {str(k):{} for k in Q}
for i in Q:
for j in Q:
num = 0
denom = 0
for t in range(0,T-1):
num = num + gammaij[t][i][j]
denom = denom+ gamma[t][i]
Aest[i][j] = num/denom
Best = {k: {} for k in Q}
for i in Q:
for j in V:
num = 0
denom = 0
for t in range(0,T):
if yt[t] == j:
num = num + gamma[t][i]
denom = denom + gamma[t][i]
Best[i][j] = num/denom
logP = -sum(log(x) for x in c)
return {'A': Aest, 'B': Best, 'logP': logP, 'PI': pi1}
In [22]:
def run_estimator(itera):
d = run_hmm(A0, PI0)
xt = d['xt']
yt = d['yt']
notconverge = True
oldlogP = -10e6
iters = 0
Aest = {'-1': {'-1': itera/9 , '0': 1/2-itera/18, '1': 1-itera/18} ,
'0': {'-1': 1/9 , '0': 7/9, '1': 1/9} ,
'1': {'-1': 1/9 , '0': 1/9, '1': 7/9}}
PIest = {'-1':1/3,'0':1/3,'1':1/3}
Best = {}
for q in Q:
Best[q] = {}
for v in V:
vi = int(v)
Best[q][v] = 1/6
while notconverge and iters<300:
iters +=1
alp = alpha_pass2(yt,Aest,Best,PIest)
alpha = alp['alpha']
c = alp['c']
#print(alpha)
beta = beta_pass2(yt,Aest,Best,c)
gamm = gamma_pass2(yt, Aest, Best, c, alpha, beta)
gamma = gamm['gamma']
gammaij = gamm['gammaij']
restimated = restimate(gamma, gammaij, c )
Aest = restimated['A']
Best = restimated['B']
PIest = restimated['PI']
logP = restimated['logP']
#print(iters, logP)
if oldlogP < logP and iters<1000:
oldlogP = logP
else:
#print iters
string=""
for q in Q:
aq=Aest[q]
string +="[{} {} {}]\n".format(aq['-1'],aq['0'],aq['1'])
print 'A:=', string
string=""
for q in Q:
aq=Best[q]
string +="[{} {} {} {} {}]\n".format(aq['0'],aq['1'],aq['2'],aq['3'],aq['4'],aq['5'])
print 'B=', string
string = "["
for q in Q:
string += "{} ".format(PIest[q])
string+="]"
print 'PI=', string
break
np.random.seed(100)
for i in range(1,6):
print '\n\n*****Run {} *********\n'.format(i)
run_estimator(i)
In [ ]:
In [ ]: